// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_

namespace Eigen {

// RunQueue is a fixed-size, partially non-blocking deque or Work items.
// Operations on front of the queue must be done by a single thread (owner),
// operations on back of the queue can be done by multiple threads concurrently.
//
// Algorithm outline:
// All remote threads operating on the queue back are serialized by a mutex.
// This ensures that at most two threads access state: owner and one remote
// thread (Size aside). The algorithm ensures that the occupied region of the
// underlying array is logically continuous (can wraparound, but no stray
// occupied elements). Owner operates on one end of this region, remote thread
// operates on the other end. Synchronization between these threads
// (potential consumption of the last element and take up of the last empty
// element) happens by means of state variable in each element. States are:
// empty, busy (in process of insertion of removal) and ready. Threads claim
// elements (empty->busy and ready->busy transitions) by means of a CAS
// operation. The finishing transition (busy->empty and busy->ready) are done
// with plain store as the element is exclusively owned by the current thread.
//
// Note: we could permit only pointers as elements, then we would not need
// separate state variable as null/non-null pointer value would serve as state,
// but that would require malloc/free per operation for large, complex values
// (and this is designed to store std::function<()>).
template<typename Work, unsigned kSize>
class RunQueue
{
  public:
	RunQueue()
		: front_(0)
		, back_(0)
	{
		// require power-of-two for fast masking
		eigen_plain_assert((kSize & (kSize - 1)) == 0);
		eigen_plain_assert(kSize > 2);			 // why would you do this?
		eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
		for (unsigned i = 0; i < kSize; i++)
			array_[i].state.store(kEmpty, std::memory_order_relaxed);
	}

	~RunQueue() { eigen_plain_assert(Size() == 0); }

	// PushFront inserts w at the beginning of the queue.
	// If queue is full returns w, otherwise returns default-constructed Work.
	Work PushFront(Work w)
	{
		unsigned front = front_.load(std::memory_order_relaxed);
		Elem* e = &array_[front & kMask];
		uint8_t s = e->state.load(std::memory_order_relaxed);
		if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
			return w;
		front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
		e->w = std::move(w);
		e->state.store(kReady, std::memory_order_release);
		return Work();
	}

	// PopFront removes and returns the first element in the queue.
	// If the queue was empty returns default-constructed Work.
	Work PopFront()
	{
		unsigned front = front_.load(std::memory_order_relaxed);
		Elem* e = &array_[(front - 1) & kMask];
		uint8_t s = e->state.load(std::memory_order_relaxed);
		if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
			return Work();
		Work w = std::move(e->w);
		e->state.store(kEmpty, std::memory_order_release);
		front = ((front - 1) & kMask2) | (front & ~kMask2);
		front_.store(front, std::memory_order_relaxed);
		return w;
	}

	// PushBack adds w at the end of the queue.
	// If queue is full returns w, otherwise returns default-constructed Work.
	Work PushBack(Work w)
	{
		std::unique_lock<std::mutex> lock(mutex_);
		unsigned back = back_.load(std::memory_order_relaxed);
		Elem* e = &array_[(back - 1) & kMask];
		uint8_t s = e->state.load(std::memory_order_relaxed);
		if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
			return w;
		back = ((back - 1) & kMask2) | (back & ~kMask2);
		back_.store(back, std::memory_order_relaxed);
		e->w = std::move(w);
		e->state.store(kReady, std::memory_order_release);
		return Work();
	}

	// PopBack removes and returns the last elements in the queue.
	Work PopBack()
	{
		if (Empty())
			return Work();
		std::unique_lock<std::mutex> lock(mutex_);
		unsigned back = back_.load(std::memory_order_relaxed);
		Elem* e = &array_[back & kMask];
		uint8_t s = e->state.load(std::memory_order_relaxed);
		if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
			return Work();
		Work w = std::move(e->w);
		e->state.store(kEmpty, std::memory_order_release);
		back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
		return w;
	}

	// PopBackHalf removes and returns half last elements in the queue.
	// Returns number of elements removed.
	unsigned PopBackHalf(std::vector<Work>* result)
	{
		if (Empty())
			return 0;
		std::unique_lock<std::mutex> lock(mutex_);
		unsigned back = back_.load(std::memory_order_relaxed);
		unsigned size = Size();
		unsigned mid = back;
		if (size > 1)
			mid = back + (size - 1) / 2;
		unsigned n = 0;
		unsigned start = 0;
		for (; static_cast<int>(mid - back) >= 0; mid--) {
			Elem* e = &array_[mid & kMask];
			uint8_t s = e->state.load(std::memory_order_relaxed);
			if (n == 0) {
				if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
					continue;
				start = mid;
			} else {
				// Note: no need to store temporal kBusy, we exclusively own these
				// elements.
				eigen_plain_assert(s == kReady);
			}
			result->push_back(std::move(e->w));
			e->state.store(kEmpty, std::memory_order_release);
			n++;
		}
		if (n != 0)
			back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
		return n;
	}

	// Size returns current queue size.
	// Can be called by any thread at any time.
	unsigned Size() const { return SizeOrNotEmpty<true>(); }

	// Empty tests whether container is empty.
	// Can be called by any thread at any time.
	bool Empty() const { return SizeOrNotEmpty<false>() == 0; }

	// Delete all the elements from the queue.
	void Flush()
	{
		while (!Empty()) {
			PopFront();
		}
	}

  private:
	static const unsigned kMask = kSize - 1;
	static const unsigned kMask2 = (kSize << 1) - 1;
	struct Elem
	{
		std::atomic<uint8_t> state;
		Work w;
	};
	enum
	{
		kEmpty,
		kBusy,
		kReady,
	};
	std::mutex mutex_;
	// Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
	// front/back, respectively. The remaining bits contain modification counters
	// that are incremented on Push operations. This allows us to (1) distinguish
	// between empty and full conditions (if we would use log(kSize) bits for
	// position, these conditions would be indistinguishable); (2) obtain
	// consistent snapshot of front_/back_ for Size operation using the
	// modification counters.
	std::atomic<unsigned> front_;
	std::atomic<unsigned> back_;
	Elem array_[kSize];

	// SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
	// only whether the size is 0 is guaranteed to be correct.
	// Can be called by any thread at any time.
	template<bool NeedSizeEstimate>
	unsigned SizeOrNotEmpty() const
	{
		// Emptiness plays critical role in thread pool blocking. So we go to great
		// effort to not produce false positives (claim non-empty queue as empty).
		unsigned front = front_.load(std::memory_order_acquire);
		for (;;) {
			// Capture a consistent snapshot of front/tail.
			unsigned back = back_.load(std::memory_order_acquire);
			unsigned front1 = front_.load(std::memory_order_relaxed);
			if (front != front1) {
				front = front1;
				std::atomic_thread_fence(std::memory_order_acquire);
				continue;
			}
			if (NeedSizeEstimate) {
				return CalculateSize(front, back);
			} else {
				// This value will be 0 if the queue is empty, and undefined otherwise.
				unsigned maybe_zero = ((front ^ back) & kMask2);
				// Queue size estimate must agree with maybe zero check on the queue
				// empty/non-empty state.
				eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
				return maybe_zero;
			}
		}
	}

	EIGEN_ALWAYS_INLINE
	unsigned CalculateSize(unsigned front, unsigned back) const
	{
		int size = (front & kMask2) - (back & kMask2);
		// Fix overflow.
		if (size < 0)
			size += 2 * kSize;
		// Order of modification in push/pop is crafted to make the queue look
		// larger than it is during concurrent modifications. E.g. push can
		// increment size before the corresponding pop has decremented it.
		// So the computed size can be up to kSize + 1, fix it.
		if (size > static_cast<int>(kSize))
			size = kSize;
		return static_cast<unsigned>(size);
	}

	RunQueue(const RunQueue&) = delete;
	void operator=(const RunQueue&) = delete;
};

} // namespace Eigen

#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
